/*
 *    	@(#)@PayInInventoryConsumer.java  2017年10月27日
 *     
 *      @COPYRIGHT@
 */
package com.capinfo.accumulation.message.consumer.collection;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;

import com.capinfo.accumulation.model.collection.CompanyAccountFund;
import com.capinfo.accumulation.model.collection.Payinitem;
import com.capinfo.accumulation.model.collection.PersonAccountFund;
import com.capinfo.accumulation.service.collection.PayininfoService;
import com.capinfo.accumulation.service.collection.PayinitemService;
import com.capinfo.accumulation.service.collection.PersonAccountFundService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;

import com.capinfo.accumulation.message.model.collection.PayInInventoryMessage;
import com.capinfo.accumulation.model.collection.Person;
import com.capinfo.components.kafka.consumer.SimpleConsumer;
import com.capinfo.components.kafka.consumer.impl.SimpleConsumerImpl;
import com.capinfo.framework.dao.SearchCriteriaBuilder;
import com.capinfo.framework.dao.impl.restriction.RestrictionExpression;
import com.capinfo.framework.service.GeneralService;
import com.capinfo.framework.util.ApplicationContextUtils;
import com.capinfo.framework.util.LogUtils;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**
 * <p>
 * 
 * </p>
 * 
 * @version 1.0
 * @author 张名扬
 */
public class PayInInventoryConsumer extends SimpleConsumerImpl<Serializable, PayInInventoryMessage> {

	private static final Log logger = LogFactory.getLog(SimpleConsumer.class);

	private ConsumerConnector connector;
	private KafkaStream<Serializable, PayInInventoryMessage> stream;

	private ApplicationContext context = ApplicationContextUtils.getApplicationContext();

	private GeneralService generalService = (GeneralService) context.getBean("generalService");
	private PayinitemService payinitemService = (PayinitemService) context.getBean("payinitemService");

	private PersonAccountFundService personAccountFundService = (PersonAccountFundService) context.getBean("personAccountFundService");

	public PayInInventoryConsumer() {
	}

	@Override
	public Runnable initiate(ConsumerConnector connector, KafkaStream<Serializable, PayInInventoryMessage> stream) {

		this.connector = connector;
		this.stream = stream;

		return this;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see java.lang.Runnable#run()
	 */
	@Override
	public void run() {

		ConsumerIterator<Serializable, PayInInventoryMessage> it = stream.iterator();
		// System.out.println(it);
		while (it.hasNext()) {

			MessageAndMetadata<Serializable, PayInInventoryMessage> message = it.next();

			StringBuilder builder = new StringBuilder();
			builder.append(Thread.currentThread());
			builder.append("----> ");
			builder.append("topic: ");
			builder.append(message.topic());
			builder.append("; partition: ");
			builder.append(message.partition());
			builder.append("; offset: ");
			builder.append(message.offset());
			builder.append("; key: ");
			builder.append(message.key());
			builder.append("; message: ");
			builder.append(message.message());

			LogUtils.debugMessage(logger, builder.toString());

			int startPosition = 0, maxResultNum = 1;

			SearchCriteriaBuilder<PersonAccountFund> criteriaBuilder = new SearchCriteriaBuilder<>(
					PersonAccountFund.class);

			criteriaBuilder.addQueryCondition("companyAccountFundId", RestrictionExpression.EQUALS_OP,
					message.message().getCompanyAccountId());

			criteriaBuilder.addQueryCondition("isPayment", RestrictionExpression.EQUALS_OP, 1l);

			criteriaBuilder.addLimitCondition(startPosition, maxResultNum);

			buildInventory(criteriaBuilder, message.message().getCompanyAccount(), message.message().getPayinfoId(),
					startPosition, maxResultNum);
			
			connector.commitOffsets();
		}
	}

	private void buildInventory(SearchCriteriaBuilder<PersonAccountFund> criteriaBuilder, String companyAccount,
			Long payinfoId, int startPosition, int maxResultNum) {
		List<PersonAccountFund> personAccountFunds = personAccountFundService.getPersonAccountBycri(criteriaBuilder);
		if (!personAccountFunds.isEmpty()) {

			payinitemService.saveOrUpdateBatch(personAccountFunds,companyAccount,payinfoId);

			startPosition += personAccountFunds.size();
			criteriaBuilder.addLimitCondition(startPosition, maxResultNum);

			buildInventory(criteriaBuilder, companyAccount, payinfoId, startPosition, maxResultNum);
		}
	}
}
